\section{Implementations}\label{sec:implementation}

\subsection{\dbsp Rust library}

We have built an implementation of \dbsp as part of an open-source
project with an MIT license:
\url{https://github.com/vmware/database-stream-processor}.
The implementation consists of a Rust library and a runtime.  The
library provides APIs for basic algebraic data types: such as groups,
finite maps, \zr, indexed \zr.  A separate circuit construction API
allows users to create \dbsp circuits by placing operator nodes
(corresponding to boxes in our diagrams) and connecting them with
streams, which correspond to the arrows in our diagrams.  The library
provides pre-built generic operators for integration, differentiation,
delay, nested integration and differentiation, and a rich library of
\zr basic incremental operators: corresponding to plus, negation,
grouping, joining, aggregation, $\distinct$, flatmap, window
aggregates, etc.

For iterative computations the library provides the $\delta_0$
operator and an operator that approximates $\int$ by terminating
iteration of a loop at a user-specified condition (usually the
condition is the requirement for a zero to appear in a specified
stream).  The low level library allows users to construct incremental
circuits manually by stitching together incremental versions of
various primitive operators.

The library has also support for multicore execution of \zr operators
(using a natural sharding strategy), and a variety of adaptors for
external data sources (e.g., Kafka, CSV files, etc).  The library can
also spill internal operator state to persistent storage.

\subsection{SQL compiler}

We have also built a SQL to \dbsp compiler, which can translate
standard SQL queries into \dbsp circuits.  The compiler implements
Algorithm~\ref{algorithm-inc}, which can be used to generate the
streaming version of any expressible SQL query.  The compiler is also
an open-source project
\url{https://github.com/vmware/sql-to-dbsp-compiler} with an MIT
license.  The compiler front-end parser and optimizer is based on the
Apache Calcite~\cite{begoli-icmd18} infrastructure.  The project is
mature enough to pass essentially all 7 million SQL Logic
Tests~\cite{sqllogictest}.  The compiler handles all aspects of SQL,
including NULLs, ternary logic, grouping, aggregation, multiset
queries, etc.

\subsection{Formal verification}

As a third implementation, we have formalized and verified all the
definitions, lemmas, propositions, theorems, and examples in this
paper using the Lean theorem prover; we make these proofs available
at~\mihai{Tej?}.  This amounted to roughly 5K lines of Lean code.

\subsection{Additional Implementation Observations}\label{sec:implementation-additional}

\subsubsection{Checkpoint/restore}

\dbsp programs are stateful streaming systems.  Fault-tolerance and
migration for such programs requires state migration.  We claim that it is
sufficient to checkpoint and restore the "contents" of all $\zm$ operator
in order to migrate the state of a Ddlog computation.

\subsubsection{Maintaining a database}

\dbsp is not a database, it is just a streaming view maintenance
system.  In particular, \dbsp will not maintain more state than
absolutely necessary to compute the changes to the views.  There is no
way to find out whether a specific value exists at a specific time
moment within a \dbsp relation.  However, a simple extension to \dbsp
runtime can be made to provide a \emph{view query} API: essentially
all relations that may be queried have to be maintained internally in
an integrated form as well.  The system can then provide an API to
enumerate or query a view about element membership between input
updates.

\subsubsection{Materialized views}

An incremental view maintenance system is not a database -- it only
computes changes to views when given changes to tables.  However, it
can be integrated with a database, by providing capabilities for
\emph{querying} both tables and views.  An input table is just the
integral of all the changes to the table.  This makes possible
building a system that is both stateful (like a database) and
streaming (like an incremental view maintenance system).

\subsubsection{Maintaining input invariants}

For relational query systems there is however an important caveat: the
proofs about the correctness of the $C_Q$ implementing the same
semantics as $Q$ all require some preconditions on the circuit inputs.
In particular, the semantics of $Q$ is only defined for sets.  In
order for $C_Q$ to faithfully emulate the behavior of $Q$ we must
enforce the invariant that the input relations are in fact sets.

However, the differential streaming version of the circuits accepts an
arbitrary stream of changes to the input relations.  Not all such
streams define input relations that are sets!  For example, consider
an input stream where the first element removes a tuple from an input
relation.  The resulting \zr does not represent a set, and thus the
proof of correctness does not hold.  This problem has been well
understood in the context of the relational algebra: it is the same as
the notion of positivity from~\cite{green-tcs11}.

We propose three different solutions to this problem, in increasing
degrees of complexity.

\paragraph{Assume that the environment is well-behaved}

The simplest solution is to do nothing and assume that at any point in time
the integral of the input stream of changes $i$ is a set: $\forall t \in \N . \isset(\I(i)[t])$.
This may be a reasonable assumption if the changes come from a controlled
medium, e.g., a traditional database, where they represent legal database changes.

\paragraph{Normalize input relations to sets}

In order to enforce that the input relations are always sets it is sufficient
to apply a $\distinct$ operator after integration.

\begin{tikzpicture}[auto,node distance=1cm,>=latex]
    \node[] (input) {$i$};
    \node[block, right of=input] (I) {$\I$};
    \node[block, right of=I, node distance=1.5cm] (distinct) {$\distinct$};
    \node[block, right of=distinct, node distance=1.5cm] (q) {$\lift{C_Q}$};
    \node[block, right of=q] (D) {$\D$};
    \node[right of=D] (output) {$o$};
    \draw[->] (input) -- (I);
    \draw[->] (I) -- (distinct);
    \draw[->] (distinct) -- (q);
    \draw[->] (q) -- (D);
    \draw[->] (D) -- (output);
\end{tikzpicture}

The semantics of the resulting circuit is identical to the semantics of $\inc{\lift{C_Q}}$
for well-behaved input streams.  For non-well behaved input streams one can give
a reasonable definition: a change is applied to the input relations, and then non-relations
are normalized into relations.  Removing a non-existent element is a no-op, and adding twice
an element is the same as adding it once.

\paragraph{Use a ``change manager''}

In this solution we interpose a separate software component between the environment
and the circuit.  Let us call this a ``change manager'' (CM).  The CM
is responsible for accepting commands from the environment that perform updates on the input
relations, validating them, and building incrementally an input change, by computing the
effect of the commands.  The CM needs to maintain enough internal state to validate all commands; this
will most likely entail maintaining the full contents of the input tables.  Note that
the input tables can be computed as the $\I$ of all input deltas ever applied.  Once all
commands producing a change have been accepted, the environment can apply the produced input change
atomically, and obtain from the circuit the corresponding output changes.

\begin{tikzpicture}[auto,node distance=1cm,>=latex]
    \node[minimum width=.5cm, minimum height=1cm] (env) {Env};
    \node[block, right of=env, minimum width=.5cm, minimum height=1cm, node distance=1.5cm] (tm) {CM};
    \node[block, right of=tm, node distance=1.5cm] (C) {$\inc{\lift{C_Q}}$};
    \node[right of=C] (output) {$o$};
    \draw[->] (env.30) -- (tm.150);
    \draw[<-] (env.330) -- (tm.210);
    \draw[->] (tm) -- node (i) {$i$} (C);
    \draw[->] (C) -- (output);
\end{tikzpicture}
